Skip to content

22 threading多线程

有些任务需要"同时"做多件事——比如一边下载文件一边处理数据、一边接收请求一边写日志。threading模块就是干这个的,它让你在一个进程里开多个线程,并行执行任务。

注意:Python有GIL(全局解释器锁),同一时刻只有一个线程在执行Python字节码。所以threading适合I/O密集型任务(网络请求、文件读写),不适合CPU密集型任务(大量计算)。CPU密集型任务用multiprocessing

一、创建线程

1.1 基本用法

python
import threading

def worker(num):
    print(f"线程 {num} 开始工作")
    # 模拟耗时操作
    import time
    time.sleep(2)
    print(f"线程 {num} 完成")

# 创建线程
t = threading.Thread(target=worker, args=(1,))

# 启动线程
t.start()

# 等待线程完成
t.join()

print("主线程继续")

1.2 多个线程

python
import threading

def worker(num):
    print(f"线程 {num} 开始")
    import time
    time.sleep(2)
    print(f"线程 {num} 完成")

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

print("所有线程完成")

1.3 继承Thread类

python
import threading

class MyThread(threading.Thread):
    def __init__(self, num):
        super().__init__()
        self.num = num

    def run(self):
        print(f"线程 {self.num} 开始")
        import time
        time.sleep(2)
        print(f"线程 {self.num} 完成")

# 使用
t = MyThread(1)
t.start()
t.join()

二、线程同步

2.1 Lock:互斥锁

保护共享资源,防止多个线程同时修改。

python
import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        lock.acquire()  # 获取锁
        try:
            counter += 1
        finally:
            lock.release()  # 释放锁

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"结果: {counter}")  # 500000

更简洁的写法,用with语句:

python
def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

2.2 RLock:可重入锁

同一线程可以多次获取锁,不会死锁。

python
import threading

rlock = threading.RLock()

def recursive_function(n):
    with rlock:
        if n > 0:
            print(f"递归 {n}")
            recursive_function(n - 1)

t = threading.Thread(target=recursive_function, args=(5,))
t.start()
t.join()

2.3 Condition:条件变量

线程间的通知机制。

python
import threading

condition = threading.Condition()
data_ready = False

def producer():
    global data_ready
    with condition:
        print("生产数据...")
        import time
        time.sleep(2)
        data_ready = True
        condition.notify()  # 通知消费者

def consumer():
    with condition:
        while not data_ready:
            condition.wait()  # 等待通知
        print("消费数据")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

2.4 Event:事件标志

简单的线程间通知。

python
import threading

event = threading.Event()

def waiter():
    print("等待事件...")
    event.wait()  # 阻塞直到事件被设置
    print("事件触发了!")

def setter():
    import time
    time.sleep(2)
    event.set()  # 触发事件

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()

2.5 Semaphore:信号量

限制同时访问资源的线程数量。

python
import threading

semaphore = threading.Semaphore(3)  # 最多3个并发

def limited_task(id):
    with semaphore:
        print(f"任务 {id} 开始")
        import time
        time.sleep(2)
        print(f"任务 {id} 完成")

threads = [threading.Thread(target=limited_task, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

2.6 Barrier:栅栏

等待所有线程到达某个点再一起继续。

python
import threading

barrier = threading.Barrier(3)

def worker(id):
    print(f"线程 {id} 准备就绪")
    barrier.wait()  # 等待其他线程
    print(f"线程 {id} 开始执行")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

三、Timer:定时器

python
import threading

def delayed_action():
    print("3秒后执行的操作")

# 3秒后执行
timer = threading.Timer(3.0, delayed_action)
timer.start()

print("主线程继续执行")
timer.join()

四、线程信息

4.1 获取当前线程

python
import threading

def worker():
    current = threading.current_thread()
    print(f"当前线程: {current.name}, ID: {current.ident}")

t = threading.Thread(target=worker, name="Worker-1")
t.start()
t.join()

4.2 列出所有线程

python
import threading

def worker():
    import time
    time.sleep(5)

threads = [threading.Thread(target=worker, name=f"Worker-{i}") for i in range(3)]
for t in threads:
    t.start()

# 列出所有活跃线程
for t in threading.enumerate():
    print(f"线程: {t.name}, 存活: {t.is_alive()}")

五、守护线程

守护线程在主线程结束时自动退出。

python
import threading
import time

def daemon_worker():
    while True:
        print("守护线程运行中...")
        time.sleep(1)

# daemon=True:设为守护线程
t = threading.Thread(target=daemon_worker, daemon=True)
t.start()

time.sleep(3)
print("主线程结束")
# 守护线程会随主线程一起退出

六、实战场景

6.1 并发下载

python
import threading
import time

def download(url):
    print(f"开始下载 {url}")
    time.sleep(2)  # 模拟下载
    print(f"完成下载 {url}")
    return f"{url} 的内容"

urls = ["url1", "url2", "url3", "url4", "url5"]

threads = []
for url in urls:
    t = threading.Thread(target=download, args=(url,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("所有下载完成")

6.2 生产者-消费者

python
import threading
import queue

q = queue.Queue()

def producer():
    for i in range(10):
        q.put(i)
        print(f"生产: {i}")
        time.sleep(0.5)

def consumer():
    while True:
        item = q.get()
        if item is None:  # 结束信号
            break
        print(f"消费: {item}")
        q.task_done()

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
q.put(None)  # 发送结束信号
t2.join()

七、常见问题

7.1 竞态条件

python
# 错误:不加锁会导致数据不一致
counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1  # 不是原子操作

# 正确:加锁保护
lock = threading.Lock()

def safe_increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

7.2 死锁

python
# 错误:可能死锁
lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1():
    with lock1:
        with lock2:  # 等待lock2
            pass

def thread2():
    with lock2:
        with lock1:  # 等待lock1
            pass

# 正确:按相同顺序获取锁
def thread1():
    with lock1:
        with lock2:
            pass

def thread2():
    with lock1:  # 也先获取lock1
        with lock2:
            pass

八、总结

threading模块的核心:

组件用途
Thread创建线程
Lock互斥锁
RLock可重入锁
Condition条件变量
Event事件标志
Semaphore信号量
Barrier栅栏同步
Timer定时器

使用场景:

  • I/O密集型任务(网络请求、文件读写)
  • 需要"同时"处理多个任务
  • 后台任务、定时任务

记住:Python的threading受GIL限制,同一时刻只有一个线程执行Python代码。CPU密集型任务用multiprocessing